代码重构:合理规划 Queue 分类
上节在业务模块中直接注册和使用了队列,但这不是最终的生产级方案。本节将队列注册逻辑重构为独立的 QModule,集中管理所有队列分类,并通过 .env 配置文件动态读取 Redis 连接信息。同时梳理了实际项目中常见的队列分类体系。
重构目标
- 将队列注册逻辑从业务模块中抽离到独立的
QModule - 使用
forRootAsync从ConfigService动态获取 Redis 配置 - 按业务类型集中注册多个队列分类
- 统一管理所有 Consumer
使用 forRootAsync 动态配置
将硬编码的 Redis 配置改为从 .env 文件动态读取:
// q.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { ConfigService } from '@nestjs/config';
@Module({
imports: [
BullModule.forRootAsync({
useFactory: (configService: ConfigService) => ({
redis: {
host: configService.get('REDIS_HOST', 'localhost'),
port: configService.get<number>('REDIS_PORT', 6379),
password: configService.get('REDIS_PASSWORD'),
},
}),
inject: [ConfigService],
}),
// Register queues by business type
BullModule.registerQueue(
{ name: 'email' },
{ name: 'data_processing' },
{ name: 'realtime_messages' },
{ name: 'image_processing' },
{ name: 'scheduled_tasks' },
{ name: 'order_processing' },
),
],
exports: [BullModule],
})
export class QModule {}
typescript
常见队列分类体系
在实际项目中,通常需要按业务类型划分不同的队列:
| 队列名称 | 用途 | 典型场景 |
|---|---|---|
email | 邮件相关任务 | 注册确认邮件、密码重置邮件 |
data_processing | 数据处理 | 数据导入导出、报表生成 |
realtime_messages | 实时消息 | 站内消息、WebSocket 推送 |
image_processing | 图片处理 | 图片上传、压缩、缩略图生成 |
scheduled_tasks | 定时任务 | 秒级精度的定时通知、延迟消息 |
order_processing | 订单处理 | 信用卡账单确认、库存扣减 |
分类的设计原则:
- 每个队列有独立的 Consumer,可以独立控制并发数和重试策略
- 不同类型的任务不会互相阻塞
- 可以按队列维度监控任务积压和处理速度
Consumer 目录结构
在每个队列下创建对应的 Consumer 文件:
q/
├── services/
│ ├── email.consumer.ts
│ ├── data-processing.consumer.ts
│ ├── realtime-messages.consumer.ts
│ ├── image-processing.consumer.ts
│ ├── scheduled-tasks.consumer.ts
│ └── order-processing.consumer.ts
├── q.module.ts
└── index.ts
text
index.ts -- 统一导出所有 Consumer:
// index.ts
import { Provider } from '@nestjs/common';
import { EmailConsumer } from './services/email.consumer';
import { ScheduledTasksConsumer } from './services/scheduled-tasks.consumer';
export const qConsumers: Provider[] = [
EmailConsumer,
ScheduledTasksConsumer,
// Add more consumers as needed
];
typescript
Consumer 示例
每个 Consumer 通过 @Processor 装饰器绑定到对应的队列名称:
// email.consumer.ts
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('email')
export class EmailConsumer {
@Process()
async handleEmail(job: Job) {
console.log('Sending email:', job.data);
// Email sending logic
}
}
// scheduled-tasks.consumer.ts
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('scheduled_tasks')
export class ScheduledTasksConsumer {
@Process()
async handleScheduledTask(job: Job) {
console.log('Processing scheduled task:', job.data);
// Scheduled task logic
}
}
typescript
重要: @Processor 装饰器中的名称必须与 registerQueue 中注册的队列名称完全一致,否则 Consumer 无法接收任务。
在业务模块中使用
重构后,业务模块只需导入 QModule 即可使用队列:
// app.module.ts
import { QModule } from './common/conditional/q/q.module';
@Module({
imports: [QModule],
})
export class AppModule {}
typescript
在 Controller 中注入指定名称的队列添加任务:
@Controller()
export class AppController {
constructor(
@InjectQueue('email') private emailQueue: Queue,
@InjectQueue('scheduled_tasks') private scheduledQueue: Queue,
) {}
@Post('send-email')
async sendEmail() {
await this.emailQueue.add('send', { to: 'user@example.com' });
return { status: 'queued' };
}
}
typescript
Conditional 条件加载
队列模块作为可选功能,通过 .env 中的 QUEUE_ON 开关控制是否注册:
// conditional.module.ts
if (config.queue_on) {
imports.push(QModule);
}
typescript
这样在不需要队列功能的环境中(如开发调试),可以通过配置关闭队列模块的加载。
本节总结
- 将队列注册从业务模块重构到独立的
QModule,实现关注点分离 - 使用
forRootAsync+ConfigService从.env动态获取 Redis 连接配置 - 按业务类型(邮件、数据处理、实时消息、图片处理、定时任务、订单处理)规划队列分类
- Consumer 与队列名称严格对应,确保任务正确路由
- 通过
index.ts统一导出 Consumer,简化模块注册
↑